package com.intersult.nutils.schedule;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * @author Dieter Kaeppel
 * @date   16.11.2011
 */
public class TimeoutExecutor<V> implements Future<List<V>> {
	private class TimeoutFuture extends FutureTask<V> {
		private long end;
		
		TimeoutFuture(Callable<V> callable, long end) {
			super(callable);
			this.end = end;
		}

		@Override
		protected void done() {
			if (!isCancelled())
				completionQueue.add(this);
		}
		
		public boolean isTimeout() {
			return end < System.nanoTime();
		}

		public long getEnd() {
			return end;
		}
	}

	final BlockingQueue<TimeoutFuture> submissionQueue;
	final BlockingQueue<TimeoutFuture> completionQueue;
	private final Executor executor;
	private boolean cancelled;
	private final int maxThreads;
	private final Collection<Callable<V>> callables;

	public TimeoutExecutor(Executor executor, Collection<Callable<V>> callables, int maxThreads) {
		this.maxThreads = maxThreads;
		if (executor == null || callables == null)
			throw new NullPointerException();
		this.executor = executor;
		this.callables = callables;
		submissionQueue = new LinkedBlockingQueue<TimeoutFuture>(maxThreads);
		completionQueue = new LinkedBlockingQueue<TimeoutFuture>(callables.size());
	}

	@Override
	public boolean cancel(boolean mayInterruptIfRunning) {
		if (isDone())
			return false;
		cancelled = true;
		for (Future<V> future : submissionQueue) {
			future.cancel(mayInterruptIfRunning);
		}
		return cancelled;
	}

	@Override
	public List<V> get() throws InterruptedException, ExecutionException {
		return get(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
	}

	@Override
	public List<V> get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException {
		long nanoTimeout = unit.toNanos(timeout);
		List<V> result = new ArrayList<V>(callables.size());
		for (Iterator<Callable<V>> iterator = callables.iterator(); iterator.hasNext() || !submissionQueue.isEmpty();) {
			while (!cancelled && submissionQueue.size() < maxThreads && iterator.hasNext()) {
				long end = System.nanoTime() + nanoTimeout;
				TimeoutFuture future = new TimeoutFuture(iterator.next(), end);
				submissionQueue.add(future);
				executor.execute(future);
			}
			long remain = submissionQueue.peek().getEnd() - System.nanoTime();
			TimeoutFuture future = completionQueue.poll(remain, TimeUnit.NANOSECONDS);
			if (future != null) {
				submissionQueue.remove(future);
				result.add(future.get());
			}
			while (!submissionQueue.isEmpty()) {
				if (submissionQueue.peek().isTimeout()) {
					submissionQueue.poll().cancel(true);
				} else {
					break;
				}
			}
		}
		return result;
	}

	@Override
	public boolean isCancelled() {
		return cancelled;
	}
	@Override
	public boolean isDone() {
		return submissionQueue.isEmpty();
	}
}